package com.hefan.schedule.taskmanager;

import com.hefan.schedule.model.ScheduleServer;
import com.hefan.schedule.model.ScheduleTaskType;
import com.hefan.schedule.service.IScheduleProcessor;
import com.hefan.schedule.service.IScheduleTaskDeal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * 任务调度器，在ScheduleManager的管理下实现多线程数据处理
 *
 * @author wangchao
 * @version 1.0.0
 * @since 2016/10/31 下午3:34:59
 */
class ScheduleProcessorSleep<T> implements IScheduleProcessor, Runnable {

  private static transient Logger logger = LoggerFactory.getLogger(ScheduleProcessorSleep.class);

  final LockObject m_lockObject = new LockObject();
  final Object lockVersionObject = new Object();
  final Object lockRunningList = new Object();
  /**
   * 任务管理器
   */
  protected ScheduleManager scheduleManager;
  /**
   * 任务处理的接口类
   */
  protected IScheduleTaskDeal taskDealBean;

  /**
   * 当前任务队列的版本号
   */
  protected long taskListVersion = 0;
  List<Thread> threadList = new CopyOnWriteArrayList<Thread>();
  /**
   * 任务类型
   */
  ScheduleTaskType taskTypeInfo;

  // protected List<T> taskList = new CopyOnWriteArrayList<T>();

  /**
   * 是否可以批处理
   */
  // boolean isMutilTask = false;
  /**
   * 是否已经获得终止调度信号
   */
  boolean isStopSchedule = false;// 用户停止队列调度

  boolean isSleeping = false;

  StatisticsInfo statisticsInfo;

  /**
   * 创建一个调度处理器
   *
   * @param aManager
   * @param aTaskDealBean
   * @param aStatisticsInfo
   * @throws Exception
   */
  public ScheduleProcessorSleep(ScheduleManager aManager, IScheduleTaskDeal aTaskDealBean, StatisticsInfo aStatisticsInfo) throws Exception {
    this.scheduleManager = aManager;
    this.statisticsInfo = aStatisticsInfo;
    this.taskTypeInfo = this.scheduleManager.getTaskTypeInfo();
    this.taskDealBean = aTaskDealBean;
    if (taskTypeInfo.getFetchDataNumber() < taskTypeInfo.getThreadNumber() * 10) {
      logger.warn("参数设置不合理，系统性能不佳。【每次从数据库获取的数量fetchnum】 >= 【线程数量threadnum】 *【最少循环次数10】 ");
    }
    //		for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {
    this.startThread(0);
    //		}
  }

  /**
   * 需要注意的是，调度服务器从配置中心注销的工作，必须在所有线程退出的情况下才能做
   *
   * @throws Exception
   */
  public void stopSchedule() throws Exception {
    // 设置停止调度的标志,调度线程发现这个标志，执行完当前任务后，就退出调度
    this.isStopSchedule = true;
    // 清除所有未处理任务,但已经进入处理队列的，需要处理完毕
    // this.taskList.clear();
  }

  private void startThread(int index) {
    Thread thread = new Thread(this);
    threadList.add(thread);
    String threadName = this.scheduleManager.getScheduleServer().getTaskType() + "-" + this.scheduleManager.getCurrentSerialNumber() + "-exe" + index;
    thread.setName(threadName);
    thread.start();
  }

  // public synchronized Object getScheduleTaskId() {
  // if (this.taskList.size() > 0)
  // return this.taskList.remove(0); // 按正序处理
  // return null;
  // }

  // public synchronized Object[] getScheduleTaskIdMulti() {
  // if (this.taskList.size() == 0){
  // return null;
  // }
  // int size = taskList.size() > taskTypeInfo.getExecuteNumber() ?
  // taskTypeInfo.getExecuteNumber()
  // : taskList.size();
  //
  // Object[] result = null;
  // if(size >0){
  // result
  // =(Object[])Array.newInstance(this.taskList.get(0).getClass(),size);
  // }
  // for(int i=0;i<size;i++){
  // result[i] = this.taskList.remove(0); // 按正序处理
  // }
  // return result;
  // }

  // public void clearAllHasFetchData() {
  // this.taskList.clear();
  // }
  // public boolean isDealFinishAllData() {
  // return this.taskList.size() == 0 ;
  // }

  public boolean isSleeping() {
    return this.isSleeping;
  }

  @SuppressWarnings({ "static-access" })
  public void run() {
    try {
      long startTime = 0;
      boolean isSuccess = false;
      while (true) {
        this.m_lockObject.addThread();
        while (true) {
          if (this.isStopSchedule == true) {// 停止队列调度
            this.m_lockObject.realseThread();
            this.m_lockObject.notifyOtherThread();// 通知所有的休眠线程
            synchronized (this.threadList) {
              this.threadList.remove(Thread.currentThread());
              if (this.threadList.size() == 0) {
                this.scheduleManager.unRegisterScheduleServer();
              }
            }
            return;
          }

          ScheduleTaskType taskType = scheduleManager.getTaskTypeInfo();
          try {// 运行相关的程序
            startTime = scheduleManager.scheduleCenter.getSystemTime();
            ScheduleServer scheduleServer = scheduleManager.getScheduleServer();
            //TODO 以后改为基于注解的方式
            if (this.taskDealBean.execute(scheduleServer) == true) {
              //TODO 执行成功后，添加执行记录
              logger.info("执行Task(" + taskType.getBaseTaskType() + ")成功！");

              addSuccessNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime, "com..schedule.ScheduleProcessorSleep.run");
              isSuccess = true;
              break;
            } else {
              logger.info("执行Task(" + taskType.getBaseTaskType() + ")失败！");
              addFailNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime, "com..schedule.ScheduleProcessorSleep.run");
            }
          } catch (Throwable ex) {
            logger.info("执行Task(" + taskType.getBaseTaskType() + ")异常！", ex);
            addFailNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime, "com..schedule.ScheduleProcessorSleep.run");
          }
        }
        // 当前队列中所有的任务都已经完成了。
        if (logger.isTraceEnabled()) {
          logger.trace(Thread.currentThread().getName() + "：当前运行线程数量:" + this.m_lockObject.count());
        }
        if (this.m_lockObject.realseThreadButNotLast() == false) {
          //					int size = 0;
          Thread.currentThread().sleep(100);
          startTime = scheduleManager.scheduleCenter.getSystemTime();
          // 装载数据
          // size = this.loadScheduleData();

          // TODO 不去装载数据，直接执行注解方法
          //					if (size > 0) {
          //						this.m_lockObject.notifyOtherThread();
          //					} else {
          // 判断当没有数据的是否，是否需要退出调度
          if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData() == true) {
            if (logger.isTraceEnabled()) {
              logger.trace("没有装载到数据，start sleep");
            }
            this.isSleeping = true;
            Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());
            this.isSleeping = false;

            if (logger.isTraceEnabled()) {
              logger.trace("Sleep end");
            }
          } else {
            // 没有数据，退出调度，唤醒所有沉睡线程
            this.m_lockObject.notifyOtherThread();
          }
          //				}
          this.m_lockObject.realseThread();
        } else {// 将当前线程放置到等待队列中。直到有线程装载到了新的任务数据
          if (logger.isTraceEnabled()) {
            logger.trace("不是最后一个线程，sleep");
          }
          this.m_lockObject.waitCurrentThread();
        }
        if (isSuccess) {
          logger.info("定时任务执行成功后，退出当前线程");
          break;
        }
      }
    } catch (Throwable e) {
      logger.error(e.getMessage(), e);
    }
  }

  public void addFetchNum(long num, String addr) {

    this.statisticsInfo.addFetchDataCount(1);
    this.statisticsInfo.addFetchDataNum(num);
  }

  public void addSuccessNum(long num, long spendTime, String addr) {
    this.statisticsInfo.addDealDataSucess(num);
    this.statisticsInfo.addDealSpendTime(spendTime);
  }

  public void addFailNum(long num, long spendTime, String addr) {
    this.statisticsInfo.addDealDataFail(num);
    this.statisticsInfo.addDealSpendTime(spendTime);
  }
}
